use std::{
    fmt,
    fmt::Display,
    str::FromStr,
    time::Duration,
};

use serde::{
    ser::SerializeMap,
    Deserialize,
    Serialize,
    Serializer,
};
use serde_json::{
    json,
    Value as JsonValue,
};
use tonic::async_trait;
use utoipa::ToSchema;
use value::heap_size::HeapSize;

use crate::{
    components::ComponentPath,
    errors::JsError,
    execution_context::ExecutionContext,
    log_lines::LogLineStructured,
    runtime::{
        Runtime,
        UnixTimestamp,
    },
    types::{
        ModuleEnvironment,
        UdfType,
        UdfTypeJson,
    },
};

/// Public worker for the LogManager.
#[async_trait]
pub trait LogSender: Send + Sync {
    fn send_logs(&self, logs: Vec<LogEvent>);
    async fn shutdown(&self) -> anyhow::Result<()>;
}

/// Structured log
#[derive(Debug, Clone)]
pub struct LogEvent {
    /// Rough timestamp of when this event was created, for the user's benefit.
    /// We provide no guarantees on the consistency of this timestamp across
    /// topics and log sources - it's best-effort.
    /// This timestamp is serialized to milliseconds.
    pub timestamp: UnixTimestamp,
    pub event: StructuredLogEvent,
}

/// User-facing UDF stats, that is logged in the UDF execution log
/// and might be used for debugging purposes.
///
/// TODO(sarah) this is nearly identical to the type in the `usage_tracking`
/// crate, but there's a dependency cycle preventing us from using it directly.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct AggregatedFunctionUsageStats {
    pub database_read_bytes: u64,
    pub database_write_bytes: u64,
    pub database_read_documents: u64,
    pub storage_read_bytes: u64,
    pub storage_write_bytes: u64,
    pub vector_index_read_bytes: u64,
    pub vector_index_write_bytes: u64,
    pub memory_used_mb: u64,
    pub return_bytes: Option<u64>,
}

#[derive(Serialize, Debug, Clone)]
pub struct OccInfo {
    pub table_name: Option<String>,
    pub document_id: Option<String>,
    pub write_source: Option<String>,
    pub retry_count: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(any(test, feature = "testing"), derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase")]
pub struct OccInfoJson {
    pub table_name: Option<String>,
    pub document_id: Option<String>,
    pub write_source: Option<String>,
    pub retry_count: u64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(any(test, feature = "testing"), derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase")]
pub struct UsageStatsJson {
    pub database_read_bytes: u64,
    pub database_write_bytes: u64,
    pub database_read_documents: u64,
    pub storage_read_bytes: u64,
    pub storage_write_bytes: u64,
    pub vector_index_read_bytes: u64,
    pub vector_index_write_bytes: u64,
    pub memory_used_mb: u64,
}

// Nothing yet. Can add information like parent scheduled job, scheduler lag,
// etc.
#[derive(Serialize, Debug, Clone)]
pub struct SchedulerInfo {
    pub job_id: String,
}

// When adding a new event type:
// - add a Schema type in the tests at the bottom of this file
// - consider adding formatting of it in the CLI
// - add it to the docs
//
// Also consider getting rid of the V1 format!
#[derive(Debug, Clone)]
pub enum StructuredLogEvent {
    /// Topic for verification logs. These are issued on sink startup and are
    /// used to test that the backend can authenticate with the sink.
    Verification,
    /// Topic for logs generated by `console.*` events. This is considered a
    /// `SystemLogTopic` since the topic is generated by the backend.
    Console {
        source: FunctionEventSource,
        log_line: LogLineStructured,
    },
    /// Topic that records UDF executions and provides information on the
    /// execution.
    FunctionExecution {
        source: FunctionEventSource,
        error: Option<JsError>,
        execution_time: Duration,
        usage_stats: AggregatedFunctionUsageStats,
        occ_info: Option<OccInfo>,
        scheduler_info: Option<SchedulerInfo>,
    },
    /// Topic for exceptions. These happen when a UDF raises an exception from
    /// JS
    Exception {
        error: JsError,
        user_identifier: Option<sync_types::UserIdentifier>,
        source: FunctionEventSource,
        udf_server_version: Option<semver::Version>,
    },
    /// Topic for deployment audit logs. These are issued when developers
    /// interact with a deployment.
    DeploymentAuditLog {
        action: String,
        metadata: serde_json::Map<String, JsonValue>,
    },
    /// Topic for global stats from the scheduler. For function-specific stats,
    /// look in FunctionExecution
    SchedulerStats {
        lag_seconds: Duration,
        num_running_jobs: u64,
    },
    ScheduledJobLag {
        lag_seconds: Duration,
    },
    /// Topic for storage usage metrics. These are periodic snapshots of current
    /// storage state aggregated across all tables.
    CurrentStorageUsage {
        total_document_size_bytes: u64,
        total_index_size_bytes: u64,
        total_vector_storage_bytes: u64,
        total_file_storage_bytes: u64,
        total_backup_storage_bytes: u64,
    },
    // User-specified topics -- not yet implemented.
    // See here for more details: https://www.notion.so/Log-Streaming-in-Convex-19a1dfadd6924c33b29b2796b0f5b2e2
    // User {
    //     topic: String,
    //     payload: serde_json::Map<String, JsonValue>
    // },
}

#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, ToSchema)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum LogEventFormatVersion {
    V1,
    V2,
}

impl FromStr for LogEventFormatVersion {
    type Err = anyhow::Error;

    fn from_str(s: &str) -> Result<Self, Self::Err> {
        match s {
            "1" => Ok(Self::V1),
            "2" => Ok(Self::V2),
            v => anyhow::bail!("Invalid LogEventFormatVersion: {v}"),
        }
    }
}

impl Display for LogEventFormatVersion {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::V1 => write!(f, "1"),
            Self::V2 => write!(f, "2"),
        }
    }
}

#[cfg(any(test, feature = "testing"))]
impl Default for LogEventFormatVersion {
    fn default() -> Self {
        Self::V2
    }
}

/// Structured log
impl LogEvent {
    pub fn default_for_verification<RT: Runtime>(runtime: &RT) -> anyhow::Result<Self> {
        Ok(Self {
            event: StructuredLogEvent::Verification,
            timestamp: runtime.unix_timestamp(),
        })
    }

    #[cfg(any(test, feature = "testing"))]
    pub fn sample_exception<RT: Runtime>(runtime: &RT) -> anyhow::Result<Self> {
        use sync_types::UserIdentifier;

        let source = FunctionEventSource {
            context: ExecutionContext::new_for_test(),
            component_path: ComponentPath::test_user(),
            udf_path: "test".to_string(),
            udf_type: UdfType::Action,
            module_environment: ModuleEnvironment::Isolate,
            cached: None,
            mutation_queue_length: None,
            mutation_retry_count: None,
        };
        Ok(Self {
            timestamp: runtime.unix_timestamp(),
            event: StructuredLogEvent::Exception {
                error: JsError::from_frames_for_test(
                    "test_message",
                    vec!["test_frame_1", "test_frame_2"],
                ),
                user_identifier: Some(UserIdentifier("test|user".to_string())),
                source,
                udf_server_version: Some(semver::Version::new(1, 5, 1)),
            },
        })
    }

    pub fn to_json_map(
        &self,
        format: LogEventFormatVersion,
    ) -> anyhow::Result<serde_json::Map<String, JsonValue>> {
        let object = self.to_json_serializer(format, serde_json::value::Serializer)?;
        let JsonValue::Object(fields) = object else {
            unreachable!();
        };
        Ok(fields)
    }

    pub fn to_json_serializer<S: Serializer>(
        &self,
        format: LogEventFormatVersion,
        serializer: S,
    ) -> Result<S::Ok, S::Error> {
        let ms = self
            .timestamp
            .as_ms_since_epoch()
            .map_err(serde::ser::Error::custom)?;
        macro_rules! serialize_map {
            ({$(
                $key:literal: $value:expr
            ),* $(,)?}) => {{
                let mut map_builder = serializer.serialize_map(None)?;
                $(map_builder.serialize_entry($key, &$value)?;)*
                map_builder.end()
            }}
        }
        match format {
            LogEventFormatVersion::V1 => match &self.event {
                StructuredLogEvent::Verification => {
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_verification",
                        "message": "Convex connection test"
                    })
                },
                StructuredLogEvent::Console { source, log_line } => {
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_console",
                        "_functionPath": source.udf_path,
                        "_functionType": source.udf_type,
                        "_functionCached": source.cached,
                        "message": log_line.to_pretty_string()
                    })
                },
                StructuredLogEvent::FunctionExecution {
                    source,
                    error,
                    execution_time,
                    usage_stats,
                    occ_info: _,
                    scheduler_info: _,
                } => {
                    let (reason, status) = match error {
                        Some(err) => (Some(err.to_string()), "failure"),
                        None => (None, "success"),
                    };
                    let execution_time_ms = execution_time.as_millis();
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_execution_record",
                        "_functionPath": source.udf_path,
                        "_functionType": source.udf_type,
                        "_functionCached": source.cached,
                        "status": status,
                        "reason": reason,
                        "executionTimeMs": execution_time_ms,
                        "databaseReadBytes": usage_stats.database_read_bytes,
                        "databaseWriteBytes": usage_stats.database_write_bytes,
                        "storageReadBytes": usage_stats.storage_read_bytes,
                        "storageWriteBytes": usage_stats.storage_write_bytes,
                    })
                },
                StructuredLogEvent::Exception {
                    error,
                    user_identifier,
                    source,
                    udf_server_version,
                } => {
                    let message = &error.message;
                    let frames: Option<Vec<String>> = error
                        .frames
                        .as_ref()
                        .map(|frames| frames.0.iter().map(|frame| frame.to_string()).collect());
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_exception",
                        "_functionPath": source.udf_path,
                        "_functionType": source.udf_type,
                        "_functionCached": source.cached,
                        "message": message,
                        "frames": frames,
                        "udfServerVersion": udf_server_version,
                        "userIdentifier": user_identifier,
                    })
                },
                StructuredLogEvent::DeploymentAuditLog { action, metadata } => {
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_audit_log",
                        "action": action,
                        "actionMetadata": metadata
                    })
                },
                StructuredLogEvent::SchedulerStats {
                    lag_seconds,
                    num_running_jobs,
                } => serialize_map!({
                    "_timestamp": ms,
                    "_topic":  "_scheduler_stats",
                    "lag_seconds": lag_seconds.as_secs(), "num_running_jobs": num_running_jobs
                }),
                StructuredLogEvent::ScheduledJobLag { lag_seconds } => {
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_scheduled_job_lag",
                        "lag_seconds": lag_seconds.as_secs()
                    })
                },
                StructuredLogEvent::CurrentStorageUsage {
                    total_document_size_bytes,
                    total_index_size_bytes,
                    total_vector_storage_bytes,
                    total_file_storage_bytes,
                    total_backup_storage_bytes,
                } => serialize_map!({
                    "_timestamp": ms,
                    "_topic": "_current_storage_usage",
                    "total_document_size_bytes": total_document_size_bytes,
                    "total_index_size_bytes": total_index_size_bytes,
                    "total_vector_storage_bytes": total_vector_storage_bytes,
                    "total_file_storage_bytes": total_file_storage_bytes,
                    "total_backup_storage_bytes": total_backup_storage_bytes,
                }),
            },
            LogEventFormatVersion::V2 => match &self.event {
                StructuredLogEvent::Verification => {
                    serialize_map!({
                        "timestamp": ms,
                        "topic": "verification",
                        "message": "Convex connection test"
                    })
                },
                StructuredLogEvent::Console { source, log_line } => {
                    let function_source = source.to_json_map();
                    let LogLineStructured {
                        messages,
                        level,
                        timestamp,
                        is_truncated,
                        system_metadata,
                    } = log_line;
                    let timestamp_ms = timestamp
                        .as_ms_since_epoch()
                        .map_err(serde::ser::Error::custom)?;
                    serialize_map!({
                        "timestamp": timestamp_ms,
                        "topic": "console",
                        "function": function_source,
                        "log_level": level.to_string(),
                        "message": messages.join(" "),
                        "is_truncated": is_truncated,
                        "system_code": system_metadata.as_ref().map(|s| &s.code)
                    })
                },
                StructuredLogEvent::FunctionExecution {
                    source,
                    error,
                    execution_time,
                    usage_stats,
                    occ_info,
                    scheduler_info,
                } => {
                    let function_source = source.to_json_map();
                    let (status, error_message) = match error {
                        Some(error) => ("failure", Some(error.to_string())),
                        None => ("success", None),
                    };
                    #[derive(Serialize)]
                    struct Usage {
                        database_read_bytes: u64,
                        database_write_bytes: u64,
                        database_read_documents: u64,
                        file_storage_read_bytes: u64,
                        file_storage_write_bytes: u64,
                        vector_storage_read_bytes: u64,
                        vector_storage_write_bytes: u64,
                        memory_used_mb: u64,
                        action_memory_used_mb: Option<u64>,
                    }
                    let action_memory_used_mb = if source.udf_type == UdfType::Action
                        || source.udf_type == UdfType::HttpAction
                    {
                        Some(usage_stats.memory_used_mb)
                    } else {
                        None
                    };
                    serialize_map!({
                        "timestamp": ms,
                        "topic": "function_execution",
                        "function": function_source,
                        "execution_time_ms": execution_time.as_millis(),
                        "status": status,
                        "error_message": error_message,
                        "occ_info": occ_info,
                        "scheduler_info": scheduler_info,
                        "usage": Usage {
                            database_read_bytes: usage_stats.database_read_bytes,
                            database_write_bytes: usage_stats.database_write_bytes,
                            database_read_documents: usage_stats.database_read_documents,
                            file_storage_read_bytes: usage_stats.storage_read_bytes,
                            file_storage_write_bytes: usage_stats.storage_write_bytes,
                            vector_storage_read_bytes: usage_stats.vector_index_read_bytes,
                            vector_storage_write_bytes: usage_stats.vector_index_write_bytes,
                            memory_used_mb: usage_stats.memory_used_mb,
                            action_memory_used_mb,
                        }
                    })
                },
                // This codepath is unused because we filter out logs in default_log_filter and
                // construct exception logs in the Sentry sink
                StructuredLogEvent::Exception {
                    error,
                    user_identifier,
                    source,
                    udf_server_version,
                } => {
                    let message = &error.message;
                    let frames: Option<Vec<String>> = error
                        .frames
                        .as_ref()
                        .map(|frames| frames.0.iter().map(|frame| frame.to_string()).collect());
                    serialize_map!({
                        "_timestamp": ms,
                        "_topic":  "_exception",
                        "_functionPath": source.udf_path,
                        "_functionType": source.udf_type,
                        "_functionCached": source.cached,
                        "message": message,
                        "frames": frames,
                        "udfServerVersion": udf_server_version,
                        "userIdentifier": user_identifier,
                    })
                },
                StructuredLogEvent::DeploymentAuditLog { action, metadata } => {
                    serialize_map!({
                        "timestamp": ms,
                        "topic": "audit_log",
                        "audit_log_action": action,
                        // stringified JSON to avoid
                        "audit_log_metadata": serde_json::to_string(metadata).map_err(serde::ser::Error::custom)?
                    })
                },
                StructuredLogEvent::SchedulerStats {
                    lag_seconds,
                    num_running_jobs,
                } => {
                    serialize_map!({
                        "topic": "scheduler_stats",
                        "timestamp": ms,
                        "lag_seconds": lag_seconds.as_secs(),
                        "num_running_jobs": num_running_jobs
                    })
                },
                StructuredLogEvent::ScheduledJobLag { lag_seconds } => {
                    serialize_map!({
                        "timestamp": ms,
                        "topic": "scheduled_job_lag",
                        "lag_seconds": lag_seconds.as_secs()
                    })
                },
                StructuredLogEvent::CurrentStorageUsage {
                    total_document_size_bytes,
                    total_index_size_bytes,
                    total_vector_storage_bytes,
                    total_file_storage_bytes,
                    total_backup_storage_bytes,
                } => {
                    serialize_map!({
                        "timestamp": ms,
                        "topic": "current_storage_usage",
                        "total_document_size_bytes": total_document_size_bytes,
                        "total_index_size_bytes": total_index_size_bytes,
                        "total_vector_storage_bytes": total_vector_storage_bytes,
                        "total_file_storage_bytes": total_file_storage_bytes,
                        "total_backup_storage_bytes": total_backup_storage_bytes,
                    })
                },
            },
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub enum EventSource {
    Function(FunctionEventSource),
    System,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub struct FunctionEventSource {
    pub context: ExecutionContext,
    pub component_path: ComponentPath,
    pub udf_path: String,
    pub udf_type: UdfType,
    pub module_environment: ModuleEnvironment,
    // Only queries can be cached, so this is only Some for queries. This is important
    // information to transmit to the client to distinguish from logs users explicitly created
    // and logs that we created for by redoing a query when its readset changes.
    pub cached: Option<bool>,
    // For mutations, this is the length of the mutation queue at the time the mutation was
    // executed. This is useful for monitoring and debugging mutation queue backlogs.
    pub mutation_queue_length: Option<usize>,
    // For mutations, this is the number of previous failed executions before a successful one.
    pub mutation_retry_count: Option<usize>,
}

impl FunctionEventSource {
    #[cfg(any(test, feature = "testing"))]
    pub fn new_for_test() -> Self {
        Self {
            context: ExecutionContext::new_for_test(),
            component_path: ComponentPath::test_user(),
            udf_path: "path/to/file:myFunction".to_string(),
            udf_type: UdfType::Mutation,
            module_environment: ModuleEnvironment::Isolate,
            cached: None,
            mutation_queue_length: None,
            mutation_retry_count: None,
        }
    }

    pub fn to_json_map(&self) -> serde_json::Map<String, JsonValue> {
        let udf_type = match self.udf_type {
            UdfType::Query => "query",
            UdfType::Mutation => "mutation",
            UdfType::Action => "action",
            UdfType::HttpAction => "http_action",
        };
        let JsonValue::Object(mut fields) = json!({
            "path": self.udf_path,
            "type": udf_type,
            "cached": self.cached,
            "request_id": self.context.request_id.to_string(),
            "mutation_queue_length": self.mutation_queue_length,
            "mutation_retry_count": self.mutation_retry_count,
        }) else {
            unreachable!()
        };
        if let Some(component_path_str) = self.component_path.clone().serialize() {
            fields.insert(
                "component_path".to_string(),
                JsonValue::String(component_path_str),
            );
        }
        fields
    }
}

impl HeapSize for FunctionEventSource {
    fn heap_size(&self) -> usize {
        self.component_path.heap_size()
            + self.udf_path.heap_size()
            + self.udf_type.heap_size()
            + self.cached.heap_size()
            + self.mutation_queue_length.heap_size()
    }
}

// Types for the /api/stream_function_logs
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(any(test, feature = "testing"), derive(utoipa::ToSchema))]
#[serde(tag = "kind")]
pub enum FunctionExecutionJson {
    #[serde(rename_all = "camelCase")]
    Completion {
        udf_type: UdfTypeJson,
        component_path: Option<String>,
        identifier: String,
        log_lines: Vec<JsonValue>,
        timestamp: f64,
        cached_result: bool,
        caller: String,
        parent_execution_id: Option<String>,
        execution_time: f64,
        success: Option<JsonValue>,
        error: Option<String>,
        request_id: String,
        execution_id: String,
        usage_stats: UsageStatsJson,
        return_bytes: Option<f64>,
        occ_info: Option<OccInfoJson>,
        execution_timestamp: f64,
        identity_type: String,
        environment: String,
    },
    #[serde(rename_all = "camelCase")]
    Progress {
        udf_type: UdfTypeJson,
        component_path: Option<String>,
        identifier: String,
        timestamp: f64,
        log_lines: Vec<JsonValue>,
        request_id: String,
        execution_id: String,
    },
}

#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(any(test, feature = "testing"), derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase")]
pub struct StreamUdfExecutionResponse {
    pub entries: Vec<FunctionExecutionJson>,
    pub new_cursor: f64,
}

#[derive(Deserialize, Debug)]
#[cfg_attr(any(test, feature = "testing"), derive(utoipa::ToSchema))]
#[serde(rename_all = "camelCase")]
pub struct StreamFunctionLogs {
    pub cursor: f64,
    pub session_id: Option<String>,
    pub client_request_counter: Option<u32>,
}

#[derive(Deserialize, Debug)]
#[cfg_attr(any(test, feature = "testing"), derive(utoipa::ToSchema))]
pub struct StreamUdfExecutionQueryArgs {
    pub cursor: f64,
}

#[cfg(test)]
mod tests {
    use serde::{
        Deserialize,
        Serialize,
    };
    use serde_json::{
        json,
        Value as JsonValue,
    };
    use utoipa::{
        OpenApi,
        ToSchema,
    };

    use crate::{
        components::ComponentPath,
        execution_context::ExecutionContext,
        log_lines::{
            LogLevel,
            LogLineStructured,
        },
        log_streaming::{
            AggregatedFunctionUsageStats,
            FunctionEventSource,
            LogEvent,
            LogEventFormatVersion,
            OccInfo,
            SchedulerInfo,
            StructuredLogEvent,
        },
        runtime::UnixTimestamp,
        types::{
            ModuleEnvironment,
            UdfType,
        },
    };

    #[test]
    fn test_serialization_of_console_log_event() -> anyhow::Result<()> {
        let timestamp = UnixTimestamp::from_millis(1000);
        let context = ExecutionContext::new_for_test();
        let request_id = context.request_id.clone();
        let event = LogEvent {
            timestamp,
            event: StructuredLogEvent::Console {
                source: FunctionEventSource {
                    context,
                    component_path: ComponentPath::test_user(),
                    udf_path: "test:test".to_string(),
                    udf_type: UdfType::Query,
                    module_environment: ModuleEnvironment::Isolate,
                    cached: Some(true),
                    mutation_queue_length: None,
                    mutation_retry_count: None,
                },
                log_line: LogLineStructured::new_developer_log_line(
                    LogLevel::Log,
                    vec!["my test log".to_string()],
                    timestamp,
                ),
            },
        };

        // Test serialization
        let fields: serde_json::Map<String, JsonValue> =
            event.to_json_map(LogEventFormatVersion::default())?;
        let value = serde_json::to_value(&fields)?;
        assert_eq!(
            value,
            json!({
                "topic": "console",
                "timestamp": 1000,
                "function": json!({
                    "path": "test:test",
                    "type": "query",
                    "cached": true,
                    "request_id": request_id.to_string(),
                    "mutation_queue_length": null,
                    "mutation_retry_count": null
                }),
                "log_level": "LOG",
                "message": "my test log",
                "is_truncated": false,
                "system_code": JsonValue::Null
            })
        );
        Ok(())
    }

    // Utoipa schemas for log stream events which are for documentation only.
    // They are cursorily tested to check they can be used to parse some
    // event log output.
    //
    // These types need to be updated manually.
    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct ConsoleLogEvent {
        timestamp: u64,
        #[schema(inline)]
        function: SchemaFunctionEventSource,
        log_level: String,
        message: String,
        is_truncated: bool,
        system_code: Option<String>,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct SchemaFunctionEventSource {
        path: String,
        r#type: String,
        cached: Option<bool>,
        request_id: String,
        mutation_queue_length: Option<usize>,
        mutation_retry_count: Option<usize>,
        component_path: Option<String>,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct SchemaOccInfo {
        table_name: Option<String>,
        document_id: Option<String>,
        write_source: Option<String>,
        retry_count: u64,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct SchemaSchedulerInfo {
        job_id: String,
    }

    // Additional log event schemas
    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct VerificationEvent {
        timestamp: u64,
        message: String, // "Convex connection test"
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct FunctionExecutionEvent {
        timestamp: u64,
        #[schema(inline)]
        function: SchemaFunctionEventSource,
        execution_time_ms: u64,
        status: String, // "success" or "failure"
        error_message: Option<String>,
        #[schema(inline)]
        occ_info: Option<SchemaOccInfo>,
        #[schema(inline)]
        scheduler_info: Option<SchemaSchedulerInfo>,
        #[schema(inline)]
        usage: UsageStats,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct UsageStats {
        database_read_bytes: u64,
        database_write_bytes: u64,
        database_read_documents: u64,
        file_storage_read_bytes: u64,
        file_storage_write_bytes: u64,
        vector_storage_read_bytes: u64,
        vector_storage_write_bytes: u64,
        action_memory_used_mb: Option<u64>,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct DeploymentAuditLogEvent {
        timestamp: u64,
        audit_log_action: String,
        audit_log_metadata: String, // JSON-stringified metadata
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct SchedulerStatsEvent {
        timestamp: u64,
        lag_seconds: u64,
        num_running_jobs: u64,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct ScheduledJobLagEvent {
        timestamp: u64,
        lag_seconds: u64,
    }

    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[allow(dead_code)]
    struct StorageUsageEvent {
        timestamp: u64,
        total_document_size_bytes: u64,
        total_index_size_bytes: u64,
        total_vector_storage_bytes: u64,
        total_file_storage_bytes: u64,
        total_backup_storage_bytes: u64,
    }

    // Union type for all log events, discriminated by topic field
    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
    #[serde(tag = "topic")]
    #[allow(dead_code)]
    enum LogStreamEvent {
        #[serde(rename = "console")]
        Console(ConsoleLogEvent),
        #[serde(rename = "verification")]
        Verification(VerificationEvent),
        #[serde(rename = "function_execution")]
        FunctionExecution(FunctionExecutionEvent),
        #[serde(rename = "audit_log")]
        DeploymentAuditLog(DeploymentAuditLogEvent),
        #[serde(rename = "scheduler_stats")]
        SchedulerStats(SchedulerStatsEvent),
        #[serde(rename = "scheduled_job_lag")]
        ScheduledJobLag(ScheduledJobLagEvent),
        #[serde(rename = "current_storage_usage")]
        CurrentStorageUsage(StorageUsageEvent),
    }

    #[test]
    fn test_v2_events_deserialize_to_schemas() -> anyhow::Result<()> {
        let verification_json = serde_json::to_value(
            &LogEvent {
                timestamp: UnixTimestamp::from_millis(1000),
                event: StructuredLogEvent::Verification,
            }
            .to_json_map(LogEventFormatVersion::V2)?,
        )?;
        let _: LogStreamEvent = serde_json::from_value(verification_json)?;

        let console_json = serde_json::to_value(
            &LogEvent {
                timestamp: UnixTimestamp::from_millis(2000),
                event: StructuredLogEvent::Console {
                    source: FunctionEventSource {
                        context: ExecutionContext::new_for_test(),
                        component_path: ComponentPath::test_user(),
                        udf_path: "test:console".to_string(),
                        udf_type: UdfType::Query,
                        module_environment: ModuleEnvironment::Isolate,
                        cached: Some(true),
                        mutation_queue_length: None,
                        mutation_retry_count: None,
                    },
                    log_line: LogLineStructured {
                        messages: vec!["test console log".to_string()].into(),
                        level: LogLevel::Log,
                        is_truncated: false,
                        timestamp: UnixTimestamp::from_millis(2000),
                        system_metadata: None,
                    },
                },
            }
            .to_json_map(LogEventFormatVersion::V2)?,
        )?;
        let _: LogStreamEvent = serde_json::from_value(console_json)?;

        let function_execution_json = serde_json::to_value(
            &LogEvent {
                timestamp: UnixTimestamp::from_millis(3000),
                event: StructuredLogEvent::FunctionExecution {
                    source: FunctionEventSource {
                        context: ExecutionContext::new_for_test(),
                        component_path: ComponentPath::test_user(),
                        udf_path: "test:function".to_string(),
                        udf_type: UdfType::Mutation,
                        module_environment: ModuleEnvironment::Isolate,
                        cached: None,
                        mutation_queue_length: Some(2),
                        mutation_retry_count: Some(0),
                    },
                    error: None,
                    execution_time: std::time::Duration::from_millis(100),
                    usage_stats: AggregatedFunctionUsageStats {
                        database_read_bytes: 512,
                        database_write_bytes: 256,
                        database_read_documents: 3,
                        storage_read_bytes: 0,
                        storage_write_bytes: 0,
                        vector_index_read_bytes: 0,
                        vector_index_write_bytes: 0,
                        memory_used_mb: 0,
                        return_bytes: Some(64),
                    },
                    occ_info: Some(OccInfo {
                        table_name: Some("test_table".to_string()),
                        document_id: Some("doc123".to_string()),
                        write_source: Some("mutation".to_string()),
                        retry_count: 1,
                    }),
                    scheduler_info: Some(SchedulerInfo {
                        job_id: "scheduled_job_456".to_string(),
                    }),
                },
            }
            .to_json_map(LogEventFormatVersion::V2)?,
        )?;
        let _: LogStreamEvent = serde_json::from_value(function_execution_json)?;

        let mut metadata = serde_json::Map::new();
        metadata.insert(
            "action".to_string(),
            serde_json::Value::String("deploy".to_string()),
        );
        let audit_log_json = serde_json::to_value(
            &LogEvent {
                timestamp: UnixTimestamp::from_millis(4000),
                event: StructuredLogEvent::DeploymentAuditLog {
                    action: "schema_push".to_string(),
                    metadata,
                },
            }
            .to_json_map(LogEventFormatVersion::V2)?,
        )?;
        let _: LogStreamEvent = serde_json::from_value(audit_log_json)?;

        let scheduler_stats_json = serde_json::to_value(
            &LogEvent {
                timestamp: UnixTimestamp::from_millis(5000),
                event: StructuredLogEvent::SchedulerStats {
                    lag_seconds: std::time::Duration::from_secs(10),
                    num_running_jobs: 25,
                },
            }
            .to_json_map(LogEventFormatVersion::V2)?,
        )?;
        let _: LogStreamEvent = serde_json::from_value(scheduler_stats_json)?;

        let job_lag_json = serde_json::to_value(
            &LogEvent {
                timestamp: UnixTimestamp::from_millis(6000),
                event: StructuredLogEvent::ScheduledJobLag {
                    lag_seconds: std::time::Duration::from_secs(5),
                },
            }
            .to_json_map(LogEventFormatVersion::V2)?,
        )?;
        let _: LogStreamEvent = serde_json::from_value(job_lag_json)?;

        Ok(())
    }

    // OpenAPI document for function execution API (for /api/stream_function_logs)
    use crate::{
        log_lines::{
            LogLevelJson,
            LogLineJson,
            SystemLogMetadataJson,
        },
        log_streaming::{
            FunctionExecutionJson,
            OccInfoJson,
            StreamFunctionLogs,
            StreamUdfExecutionQueryArgs,
            StreamUdfExecutionResponse,
            UsageStatsJson,
        },
        types::UdfTypeJson,
    };

    #[derive(utoipa::OpenApi)]
    #[openapi(
        info(
            title = "Convex Function Logs API",
            version = "1.0.0",
            description = "Schema definitions for /api/stream_function_logs and \
                           /api/stream_udf_execution endpoints"
        ),
        components(schemas(
            FunctionExecutionJson,
            StreamUdfExecutionResponse,
            StreamFunctionLogs,
            StreamUdfExecutionQueryArgs,
            LogLineJson,
            LogLevelJson,
            SystemLogMetadataJson,
            UsageStatsJson,
            UdfTypeJson,
            OccInfoJson,
        ))
    )]
    struct FunctionLogsApiDoc;

    #[test]
    fn test_function_logs_api_schema_matches() -> anyhow::Result<()> {
        use std::{
            fs,
            path::Path,
        };

        const SCHEMA_FILE: &str = "../../npm-packages/convex/function-logs-openapi.json";

        // Generate OpenAPI spec using utoipa
        let openapi_spec = FunctionLogsApiDoc::openapi();
        let current_schema = openapi_spec.to_pretty_json()?;

        // Check if file exists and compare
        if Path::new(SCHEMA_FILE).exists() {
            let existing_schema = fs::read_to_string(SCHEMA_FILE)?;
            if existing_schema.trim() != current_schema.trim() {
                // Write updated schema
                fs::write(SCHEMA_FILE, &current_schema)?;
                panic!(
                    "{SCHEMA_FILE} does not match current schema. This test automatically updated \
                     the file so you can run again: `cargo test -p common \
                     test_function_logs_api_schema_matches`"
                );
            }
        } else {
            // Create directory if it doesn't exist
            if let Some(parent) = Path::new(SCHEMA_FILE).parent() {
                fs::create_dir_all(parent)?;
            }
            fs::write(SCHEMA_FILE, &current_schema)?;
            panic!(
                "Created new {SCHEMA_FILE}. Run the test again to verify: `cargo test -p common \
                 test_function_logs_api_schema_matches`"
            );
        }

        Ok(())
    }

    // OpenAPI document for log stream events (for external sinks)
    #[derive(utoipa::OpenApi)]
    #[openapi(
        info(
            title = "Convex Log Stream Events",
            version = "2.0.0",
            description = "Schema definitions for Convex log stream events"
        ),
        components(schemas(LogStreamEvent))
    )]
    struct LogStreamApiDoc;

    #[test]
    fn test_log_stream_api_schema_matches() -> anyhow::Result<()> {
        use std::{
            fs,
            path::Path,
        };

        const SCHEMA_FILE: &str = "../../npm-packages/@convex-dev/platform/log-stream-openapi.json";

        // Generate OpenAPI spec using utoipa
        let openapi_spec = LogStreamApiDoc::openapi();
        let current_schema = openapi_spec.to_pretty_json()?;

        // Check if file exists and compare
        if Path::new(SCHEMA_FILE).exists() {
            let existing_schema = fs::read_to_string(SCHEMA_FILE)?;
            if existing_schema.trim() != current_schema.trim() {
                // Write updated schema
                fs::write(SCHEMA_FILE, &current_schema)?;
                panic!(
                    "{SCHEMA_FILE} does not match current schema. This test automatically updated \
                     the file so you can run again: `cargo test -p common \
                     test_log_stream_api_schema_matches`"
                );
            }
        } else {
            // Create directory if it doesn't exist
            if let Some(parent) = Path::new(SCHEMA_FILE).parent() {
                fs::create_dir_all(parent)?;
            }
            fs::write(SCHEMA_FILE, &current_schema)?;
            panic!(
                "Created new {SCHEMA_FILE}. Run the test again to verify: `cargo test -p common \
                 test_log_stream_api_schema_matches`"
            );
        }

        Ok(())
    }
}
